-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Automatically restart memory-leaking workers when they reach a critical limit #4221
base: main
Are you sure you want to change the base?
Conversation
I think that the main question we need to determine is what policy we should use here. Should we always restart? Should we never restart but keep logging? Should we log for a while, but after it appears to not be getting any better (maybe five seconds) then restart? I like this last option personally. |
Also, thank you for taking the time to submit this. |
IMHO this should happen directly before starting to work on a new task. However, I wonder if there is an in-place restarting possibility for the worker.
|
Oh yeah, that actually sounds like a great idea.
Fortunately in this situation all of the data is already on disk, so we probably don't need to trouble ourselves with this. In principle I think that we would want to ...
This is a non-trivial engineering effort, but might be an interesting project for someone who wants to become more familiar with the worker-nanny-scheduler relationship. Does this work interest you @Hoeze ? |
I'd like to solve this problem and think it would save a lot of people (including me) a lot of time if this restarting works bomb-proof. I'm happy to invest another working day on this issue, but I think it would be better if someone with deeper knowledge could jump in here. |
I don't fully understand what is happening here but the function and leaky dictionary are stored in cache_loads on the worker. As an en experiment, I rewrote the function slightly and pull out the serialized function from the worker: x = {}
def memory_leaking_fn(data):
x[data] = random.randint(10,100)
x[data+str(random.randint(10,10000))] = 'foo'
print(x)
print(locals())
time.sleep(0.1)
return data In a separate process load the serialized function In [1]: import pickle
In [2]: ser_func = b"\x80\x04\x95\x93\x02\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\r_builtin_type\x94\x93\x94\x8c\nLambdaType\x94\x85\x94R\x94(h\x02\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x01K\x08KCCHt\x00\xa0\x01d\x01d\x02\xa1\x02t\x02|\x00<\x
...: 00d\x03t\x02|\x00t\x03t\x00\xa0\x01d\x01d\x04\xa1\x02\x83\x01\x17\x00<\x00t\x04t\x02\x83\x01\x01\x00t\x04t\x05\x83\x00\x83\x01\x01\x00t\x06\xa0\x07d\x05\xa1\x01\x01\x00|\x00S\x00\x94(NK\nKd\x8c\x03foo\x94M\x10'G?\xb9\x99\x99\x99\x99\x99\x9at\x94(\x8c\x06random\x94
...: \x8c\x07randint\x94\x8c\x01x\x94\x8c\x03str\x94\x8c\x05print\x94\x8c\x06locals\x94\x8c\x04time\x94\x8c\x05sleep\x94t\x94\x8c\x04data\x94\x85\x94\x8c\x07test.py\x94\x8c\x11memory_leaking_fn\x94K\nC\x0c\x00\x02\x10\x01\x18\x01\x08\x01\n\x02\n\x01\x94))t\x94R\x94}\x9
...: 4(\x8c\x0b__package__\x94N\x8c\x08__name__\x94\x8c\x08__main__\x94\x8c\x08__file__\x94\x8c\x07test.py\x94uNNNt\x94R\x94\x8c\x1ccloudpickle.cloudpickle_fast\x94\x8c\x12_function_setstate\x94\x93\x94h#}\x94}\x94(h\x1eh\x18\x8c\x0c__qualname__\x94h\x18\x8c\x0f__annot
...: ations__\x94}\x94\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c\n__module__\x94h\x1f\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94N\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94(h\x12h\x00\x8c\tsubimport\x94\x93\x94\x8c\x04time\x94\x85
...: \x94R\x94h\x0ch6h\x0c\x85\x94R\x94h\x0e}\x94uu\x86\x94\x86R0."
In [3]: func = pickle.loads(ser_func)
In [4]: func(str(1))
{'1': 15, '19359': 'foo'}
{'data': '1'}
Out[4]: '1'
In [5]: func(str(2))
{'1': 15, '19359': 'foo', '2': 97, '29211': 'foo'}
{'data': '2'}
Out[5]: '2'
In [6]: func.__globals__['x']
Out[6]: {'1': 15, '19359': 'foo', '2': 97, '29211': 'foo'}
In [7]: globals()['x']
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
<ipython-input-7-9edb74b452e4> in <module>
----> 1 globals()['x']
KeyError: 'x' This is moving out of my knowledge but it looks like the global space of the deserialized function brings its own copy of the dictionary -- this probably one of the ways cloudpickle/pickle can bring in dependencies when serializing a function g which depends locally on a func f. |
@quasiben and I spoke about this offline. I think that he is trying to avoid the situation where cloudpickle is handling a function that leaks data in a closed-over variable. I think that this is not a good path to go down. I think that we need to assume that user code can leak in ways that we will never be able to resolve, and that we will need to occasionally restart in these situations. |
@@ -2678,6 +2678,7 @@ def check_pause(memory): | |||
if self.memory_limit is not None | |||
else "None", | |||
) | |||
await self.close_gracefully(restart=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to anticipate cases where restarting might not be welcome. I can think of a couple:
- There is some slow cleanup process that needs a couple of seconds to clear memory out (I haven't seen this personally). Maybe it makes sense to wait a few times before restarting? This might not be worth the complexity though
- The worker is currently working on something, and we should give it a moment to try to finish it. This could be determined by checking the
self.executing
set of currently executing tasks. If this is empty then sure, restarting seems ok. If not, then we might want to wait for a bit, but not forever.
I agree that that would be good. It may not happen quickly though. There is a lot of work happening right now, and most maintainers are unfortunately quite busy these days. |
I would like to help but I lack knowledge. What's the best way to install this repo using pip? I tried to manually do the change in my site-package folder, since it's just one line and it does the trick. The workers restart when they reach the memory limit BUT the job fails with this traceback: 760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
~/venv/lib/python3.8/site-packages/distributed/client.py in _gather()
1849 exc = CancelledError(key)
1850 else:
-> 1851 raise exception.with_traceback(traceback)
1852 raise exc
1853 if errors == "skip":
KilledWorker: ('_fit_one_estimator-e733af06-4296-44e5-8a1d-28a876c9f9a0', <Worker 'tcp://10.27.2.239:37489', name: 45, memory: 0, processing: 46>) If this PR gets completed it would be a huge improvement for using |
Hi @gioxc88, installing dask-distributed requires (due to a reason I don't know) dask from master. I updated my pull request to current master and fixed the memory leak test. |
Thanks for the answer, I'll try again tomorrow. Are there any new development on the actual code (aside from the tests)? Many thanks |
Not from my side. I do have the same issues (#4193 (comment)). This patch only fixes one very specific problem in terms of external memory-leaking processes by trying to gracefully restart them. You can try to increase IMHO, what should be done to solve these issues:
This way, we don't care about worker failures any more. Results are known to have a certain size and reside at a secure location. |
This looks to solve a perennial issue for me. Is there any reason not to take this less-than-perfect solution for now and work on the nanny-restarts-worker solution in the longer term? If it is just a matter of implementing a delay and a check for currently executing tasks in the review comment I can do that. |
I expect that this PR would kill off long-running tasks that temporarily allocate a lot of RAM. I'm in the process of designing a more holistic and robust solution. |
If memory usage is high enough that we hit the pause fraction watermark and there are no keys to spill to disk, then I think what we could do is:
I think that if memory usage remained high after that sequence the worker should kill itself. The situation cannot reasonably be expected to improve if tasks are not cleaning up after themselves and gc cannot do it either. |
Attempt to fix #4193.
This pull request successfully restarts memory-leaking workers.
However, some workers still keep freezing.
Minimal example to reproduce memory leak outside of the test suite: